﻿<h1>一些常见并发编程错误</h1>

<p>
Go语言是一门天然支持并发的编程语言。
通过使用<code>go</code>关键字，我们可以很轻松地创建协程；通过<a href="channel-use-cases.html">使用</a><a href="channel.html">通道</a>和<a href="concurrent-atomic-operation.html">Go中提供的</a>其它<a href="concurrent-synchronization-more.html">各种同步技术</a>，并发编程变得简单、轻松和有趣。
</p>

<p>
另一方面，Go并不阻止程序员在并发编程中因为粗心或者经验不足而犯错。
本文的余下部分将展示一些常见的并发错误，来帮助Go程序员在实践中避免这些错误。
</p>

<h3>当需要同步的时候没有同步</h3>

<div>
<p>
我们已经知道，源文件中的代码行在运行时刻<a href="memory-model.html">并非总是按照它们的出现次序被执行</a>。
</p>

下面这个示例程序犯了两个错误：
<ul>
<li>
	首先，主协程中对变量<code>b</code>的读取和匿名协程中的对变量<code>b</code>的写入可能会产生数据竞争；
</li>
<li>
	其次，在主协程中，条件<code>b == true</code>成立并不能确保条件<code>a != nil</code>也成立。
	编译器和CPU可能会对<a href="memory-model.html">调整此程序中匿名协程中的某些指令的顺序</a>已获取更快的执行速度。
	所以，站在主协程的视角看，对变量<code>b</code>的赋值可能会发生在对变量<code>a</code>的赋值之前，这将造成在修改<code>a</code>的元素时<code>a</code>依然为一个nil切片。
</li>
</ul>

<pre class="line-numbers"><code class="language-go">package main

import (
	"time"
	"runtime"
)

func main() {
	var a []int // nil
	var b bool  // false

	// 一个匿名协程。
	go func () {
		a = make([]int, 3)
		b = true // 写入b
	}()

	for !b { // 读取b
		time.Sleep(time.Second)
		runtime.Gosched()
	}
	a[0], a[1], a[2] = 0, 1, 2 // 可能会发生恐慌
}
</code></pre>

<p>
上面这个程序可能在很多计算机上运行良好，但是可能会在某些计算机上因为恐慌而崩溃退出；或者使用某些编译器编译的时候运行良好，但使用另外的某个编译器编译的时候将造成程序运行时崩溃退出。
</p>

我们应该使用通道或者<code>sync</code>标准库包中的同步技术来确保内存顺序。比如：

<pre class="line-numbers"><code class="language-go">package main

func main() {
	var a []int = nil
	c := make(chan struct{})

	go func () {
		a = make([]int, 3)
		c <- struct{}{}
	}()

	<-c
	a[0], a[1], a[2] = 0, 1, 2 // 绝不会造成恐慌
}
</code></pre>

<p>
</p>
</div>

<a class="anchor" id="sleep"></a>
<h3>使用<code>time.Sleep</code>调用来做同步</h3>

<div>

让我们看一个简单的例子：

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"time"
)

func main() {
	var x = 123

	go func() {
		x = 789 // 写入x
	}()

	time.Sleep(time.Second)
	fmt.Println(x) // 读取x
}
</code></pre>

<p>
我们期望着此程序打印出<code>789</code>。
事实上，则其运行结果常常正如我们所期待的。
但是，此程序中的同步处理实现的正确吗？否！原因很简单，Go运行时并不能保证对<code>x</code>的写入一定发生在对<code>x</code>的读取之前。
在某些特定的情形下，比如CPU资源被很一些其它计算密集的程序所占用，则对<code>x</code>的写入有可能发生在对<code>x</code>的读取之后。
因此，我们不应该在正式的项目中使用<code>time.Sleep</code>调用来做同步。
</p>

<!--
https://groups.google.com/forum/#!topic/golang-nuts/J3kXXZivlHA
-->

让我们看另一个简单的例子：

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"time"
)

var x = 0

func main() {
	var num = 123
	var p = &num

	c := make(chan int)

	go func() {
		c <- *p + x
	}()

	time.Sleep(time.Second)
	num = 789
	fmt.Println(<-c)
}
</code></pre>

<p>
你觉得此程序会输出什么？<code>123</code>还是<code>789</code>？
事实上，它的输出是和具体使用的编译器相关的。
对于标准编译器1.17版本来说，它很可能输出<code>123</code>。
但是从理论上说，它输出<code>789</code>或者另外一个预想不到的值也是有可能的。
</p>

<p>
让我们将此例中的<code>c &lt;- *p + x</code>一行换成<code>c &lt;- *p</code>，然后重新运行它，你将会发现它的输出变成了<code>789</code>（如果它使用标准编译器1.17版本编译的话）。
重申一次，此结果是和具体使用的编译器和编译器的版本相关的。
</p>

<p>
是的，此程序中存在数据竞争。表达式<code>*p</code>的估值可能发生在赋值<code>num = 789</code>之前、之后、或者同时。
<code>time.Sleep</code>调用并不能保证<code>*p</code>的估值发生在此赋值之后。
</p>

对于这个特定的例子，我们应该将欲发送的值在开启新协程之前存储在一个临时变量中来避免数据竞争。

<pre class="line-numbers"><code class="language-go">...
	tmp := *p
	go func() {
		c <- tmp
	}()
...
</code></pre>

<p>
</p>

</div>

<h3>使一些协程永久处于阻塞状态</h3>

<div>
有很多原因导致某个协程永久阻塞，比如：
<ul>
<li>
	从一个永远不会有其它协程向其发送数据的通道接收数据；
</li>
<li>
	向一个永远不会有其它协程从中读取数据的通道发送数据；
</li>
<li>
	被自己死锁了；
</li>
<li>
	和其它协程相互死锁了；
</li>
<li>
	等等。
</li>
</ul>

<p>
除了有时我们故意地将主协程永久阻塞以防止程序退出外，其它大多数造成协程永久阻塞的情况都不是我们所期待的。
Go运行时很难分辨出一个处于阻塞状态的协程是否将永久阻塞下去，所以Go运行时不会释放永久处于阻塞状态的协程占用的资源。
</p>

在<a href="channel-use-cases.html#first-response-wins">采用最快回应</a>通道用例中，如果被当作future/promise来用的通道的容量不足够大，则较慢回应的协程在准备发送回应结果时将永久阻塞。
比如，下面的例子中，每个请求将导致4个协程永久阻塞。

<pre class="line-numbers"><code class="language-go">func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			c <- i // 4个协程将永久阻塞在这里
		}()
	}
	return <-c
}
</code></pre>

<p>
为了防止有4个协程永久阻塞，被当作future/promise使用的通道的容量必须至少为<code>4</code>.
</p>

在<a href="channel-use-cases.html#first-response-wins-2">第二种“采用最快回应”实现方法</a>中，如果被当作future/promise使用的通道是一个非缓冲通道（如下面的代码所示），则有可能导致其通道的接收者可能会错过所有的回应而导致处于永久阻塞状态。

<pre class="line-numbers"><code class="language-go">func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			select {
			case c <- i:
			default:
			}
		}()
	}
	return <-c // 有可能永久阻塞在此
}
</code></pre>

<p>
接收者协程可能会永久阻塞的原因是如果5个尝试发送操作都发生在接收操作<code>&lt;-c</code>准备好之前，亦即5个个尝试发送操作都失败了，则接收者协程将永远无值可接收（从而将处于永久阻塞状态）。
</p>

<p>
将通道<code>c</code>改为一个缓冲通道，则至少会有一个尝试发送将成功，从而接收者协程肯定不会永久阻塞。
</p>

</div>

<h3>复制<code>sync</code>标准库包中的类型的值</h3>

<div>
<p>
在实践中，<code>sync</code>标准库包中的类型（除了<code>Locker</code>接口类型）的值不应该被复制。
我们只应该复制它们的指针值。
</p>

下面是一个有问题的并发编程的例子。
在此例子中，当<code>Counter.Value</code>方法被调用时，一个<code>Counter</code>属主值将被复制，此属主值的字段<code>Mutex</code>也将被一同复制。
此复制并没有被同步保护，因此复制结果可能是不完整的，并非被复制的属主值的一个快照。
即使此<code>Mutex</code>字段得以侥幸完整复制，它的副本所保护的是对字段<code>n</code>的一个副本的访问，因此一般是没有意义的。

<pre class="line-numbers"><code class="language-go">import "sync"

type Counter struct {
	sync.Mutex
	n int64
}

// 此方法实现是没问题的。
func (c *Counter) Increase(d int64) (r int64) {
	c.Lock()
	c.n += d
	r = c.n
	c.Unlock()
	return
}

// 此方法的实现是有问题的。当它被调用时，
// 一个Counter属主值将被复制。
func (c Counter) Value() (r int64) {
	c.Lock()
	r = c.n
	c.Unlock()
	return
}
</code></pre>

<p>
我们应该将<code>Value</code>方法的属主参数类型更改为指针类型<code>*Counter</code>来避免复制<code>sync.Mutex</code>值。
</p>

<p>
Go官方工具链中提供的<code>go vet</code>命令将提示此例中的<code>Value</code>方法的声明可能是一个潜在的逻辑错误。
</p>

</div>

<h3>在错误的地方调用<code>sync.WaitGroup.Add</code>方法</h3>

<div>

<p>
每个<code>sync.WaitGroup</code>值内部维护着一个计数。此计数的初始值为0。
如果一个<code>sync.WaitGroup</code>值的<code>Wait</code>方法在此计数为0的时候被调用，则此调用不会阻塞，否则此调用将一直阻塞到此计数变为0为止。
</p>

<p>
为了让一个<code>WaitGroup</code>值的使用有意义，在此值的计数为0的情况下，对它的下一次<code>Add</code>方法的调用必须出现在对它的下一次<code>Wait</code>方法的调用之前。
</p>

比如，在下面的例子中，<code>Add</code>方法的调用位置是不合适的。
此例子程序的打印结果并不总是<code>100</code>，而可能是<code>0</code>到<code>100</code>间的任何一个值。
原因是没有任何一个<code>Add</code>方法调用可以确保发生在唯一的<code>Wait</code>方法调用之前，结果导致没有任何一个<code>Done</code>方法调用可以确保发生在唯一的<code>Wait</code>方法调用返回之前。

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var wg sync.WaitGroup
	var x int32 = 0
	for i := 0; i < 100; i++ {
		go func() {
			wg.Add(1)
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}

	fmt.Println("等待片刻...")
	wg.Wait()
	fmt.Println(atomic.LoadInt32(&x))
}
</code></pre>

<p>
</p>

我们应该将对<code>Add</code>方法的调用移出匿名协程之外，像下面这样，使得任何一个<code>Done</code>方法调用都确保发生在唯一的<code>Wait</code>方法调用返回之前。

<pre class="line-numbers"><code class="language-go">...
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}
...
</code></pre>

<p>
</p>

</div>

<h3>不当地使用用做Future/Promise的通道</h3>

<div>

从<a href="channel-use-cases.html#future-promise">通道用例大全</a>一文中，我们了解到一些函数可以返回用做future/promise的通道结果。
假设<code>fa</code>和<code>fb</code>是这样的两个函数，则下面的调用方式并没有体现出这两个函数的真正价值。

<pre class="line-numbers"><code class="language-go">doSomethingWithFutureArguments(<-fa(), <-fb())
</code></pre>

<p>
在上面这行调用中，两个实参值（promise回应结果）的生成实际上是串行进行的，future/promise的价值没有体现出来。
</p>

我们应该像下面这样调用这两个函数来并发生成两个回应结果：

<pre class="line-numbers"><code class="language-go">ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-ca, <-cb)
</code></pre>

<p>
</p>

</div>

<h3>没有让最后一个活跃的发送者关闭通道</h3>

<div>

<p>
Go程序员常犯的一个错误是关闭一个后续可能还会有协程向其发送数据的通道。
当向一个已关闭的通道发送数据的时候，一个恐慌将产生。
</p>

<p>
这样的错误曾经发生在一些很有名的项目中，比如Kubernetes项目中的<a href="https://github.com/kubernetes/kubernetes/pull/45291/files?diff=split">这个bug</a>和<a href="https://github.com/kubernetes/kubernetes/pull/39479/files?diff=split">这个bug</a>。
</p>

<p>
请阅读<a href="channel-closing.html">此篇文章</a>来了解如何安全和优雅地关闭通道。
</p>

</div>


<h3>对地址不保证为8字节对齐的值执行64位原子操作</h3>

<div>
<p>
截至目前（Go 1.17），64位原子操作中涉及到的实参地址必须为8字节对齐的。不满足此条件的64位原子操作将造成一个恐慌。
对于标准编译器，这样的情形只<a href="https://golang.google.cn/pkg/sync/atomic/#pkg-note-BUG">可能发生在32位的架构中</a>。
请阅读<a href="memory-layout.html">内存布局一文</a>来获知如何确保让64位的整数值的地址在32位的架构中8字节对齐。
</p>

</div>

<h3>没留意过多的<code>time.After</code>函数调用消耗了大量资源</h3>

<div>
<p>
<code>time</code>标准库包中的<code>After</code>函数返回<a href="channel-use-cases.html#timer">一个用做延迟通知的通道</a>。
此函数给并发编程带来了很多便利，但是它的每个调用都需要创建一个<code>time.Timer</code>值，此新创建的<code>Timer</code>值在传递给<code>After</code>函数调用的时长（实参）内肯定不会被垃圾回收。
如果此函数在某个时段内被多次频繁调用，则可能导致积累很多尚未过期的<code>Timer</code>值从而造成大量的内存和计算消耗。
</p>

比如在下面这个例子中，如果<code>longRunning</code>函数被调用并且在一分钟内有一百万条消息到达，
那么在某个特定的很小时间段（大概若干秒）内将存在一百万个活跃的<code>Timer</code>值，即使其中只有一个是真正有用的。

<pre class="line-numbers"><code class="language-go">import (
	"fmt"
	"time"
)

// 如果某两个连续的消息的间隔大于一分钟，此函数将返回。
func longRunning(messages <-chan string) {
	for {
		select {
		case <-time.After(time.Minute):
			return
		case msg := <-messages:
			fmt.Println(msg)
		}
	}
}
</code></pre>

<p>
</p>

为了避免太多的<code>Timer</code>值被创建，我们应该只使用（并复用）一个<code>Timer</code>值，像下面这样：

<pre class="line-numbers"><code class="language-go">func longRunning(messages <-chan string) {
	timer := time.NewTimer(time.Minute)
	defer timer.Stop()

	for {
		select {
		case <-timer.C: // 过期了
			return
		case msg := <-messages:
			fmt.Println(msg)

			// 此if代码块很重要。
			if !timer.Stop() {
				<-timer.C
			}
		}

		// 必须重置以复用。
		timer.Reset(time.Minute)
	}
}
</code></pre>

<p>
注意，此示例中的<code>if</code>代码块用来舍弃一个可能在执行第二个分支代码块的时候发送过来的超时通知。
</p>

</div>

<h3>不正确地使用<code>time.Timer</code>值</h3>

<div>

一个典型的<code>time.Timer</code>的使用已经在上一节中展示了。一些解释：
<ul>
<li>
	如果一个<code>Timer</code>值已经过期或者已经被终止（stopped），则相应的<code>Stop</code>方法调用返回<code>false</code>。
	在此<code>Timer</code>值尚未终止的时候，<code>Stop</code>方法调用返回<code>false</code>只能意味着此<code>Timer</code>值已经过期。
</li>
<li>
	一个<code>Timer</code>值被终止之后，它的通道字段<code>C</code>最多只能含有一个过期的通知。
</li>
<li>
	在一个<code>Timer</code>终止（stopped）之后并且在重置和重用此<code>Timer</code>值之前，我们应该确保此<code>Timer</code>值中肯定不存在过期的通知。
	这就是上一节中的例子中的<code>if</code>代码块的意义所在。
</li>
</ul>

<p>
一个<code>*Timer</code>值的<code>Reset</code>方法必须在对应<code>Timer</code>值过期或者终止之后才能被调用；
否则，此<code>Reset</code>方法调用和一个可能的向此<code>Timer</code>值的<code>C</code>通道字段的发送通知操作产生数据竞争。
</p>

<p>
如果上一节中的例子中的<code>select</code>流程控制代码块中的第一个分支被选中，则这表示相应的<code>Timer</code>值已经过期，所以我们不必终止它。
但是我们必须在第二个分支中通过终止此<code>Timer</code>以检查此<code>Timer</code>中是否存在一个过期的通知。
如果确实有一个过期的通知，我们必须在重用这个<code>Timer</code>之前将此过期的通知取出；否则，此过期的通知将下一个循环步导致在第一个分支立即被选中。
</p>

比如，下面这个程序将在运行后大概一秒钟（而不是十秒钟）后退出。
而且此程序存在着潜在的数据竞争。

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"time"
)

func main() {
	start := time.Now()
	timer := time.NewTimer(time.Second/2)
	select {
	case <-timer.C:
	default:
		time.Sleep(time.Second) // 此分支被选中的可能性较大
	}
	timer.Reset(time.Second * 10) // 可能数据竞争
	<-timer.C
	fmt.Println(time.Since(start)) // 大约1s
}
</code></pre>

<p>
当一个<code>time.Timer</code>值不再被使用后，我们不必（但是推荐）终止之。
</p>

<p>
在多个协程中使用同一个<code>time.Timer</code>值比较容易写出不当的并发代码，所以尽量不要跨协程使用一个<code>Timer</code>值。
</p>

<p>
我们不应该依赖于<code>time.Timer</code>的<code>Reset</code>方法的返回值。此返回值只要是为了历史兼容性而存在的。
</p>
</div>
